package com.chenjl.trace.transport;

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;

import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.chenjl.trace.Constant;
/**
 * Kafka消息生产者
 * 2018-9-25 18:50:42
 * @author chenjinlong
 */
public class KafkaMessageProducer {
	private static final Logger log = LoggerFactory.getLogger(KafkaMessageProducer.class);
	
	public static void main(String[] args) throws InterruptedException, ExecutionException {
		
		Properties props = new Properties();
		props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,Constant.BROKER_URL);
		props.put(ProducerConfig.CLIENT_ID_CONFIG, "DemoProducer");
		props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
		props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
	    
		Producer<String,String> producer = new KafkaProducer<String,String>(props);
		
		
		//执行send时，并不是直接就向socket发起网络请求，而是先把数据存储到发送的缓冲区中，这个缓冲区的实现是一个RecordAccumulator实例
		ProducerRecord<String, String> hello= new ProducerRecord<String, String>(Constant.TOPIC_NAME_STR,"hello key","hello value");
		producer.send(hello,new Callback() {
			@Override
			public void onCompletion(RecordMetadata metadata,Exception exception) {
				log.info("hello message , offset : {} , topic : {}",metadata.offset(),metadata.topic());
			}
		});
		
		SimpleDateFormat simpleDateFormat =new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
		
		for(int i=1;i<=20;i++) {
			String messageKey = "message k :"+i;
			String messageValue = simpleDateFormat.format(new Date());
			ProducerRecord<String, String> message = new ProducerRecord<String, String>(Constant.TOPIC_NAME_STR,messageKey,messageValue);
			
			//异步发送, 同步等待
			Future<RecordMetadata> future = producer.send(message);
			RecordMetadata recordMetadata = future.get();
			log.info("kafka send message complete..  offset : {} , topic : {} , partition : {}",recordMetadata.offset(),recordMetadata.topic(),recordMetadata.partition());
			
			Thread.sleep(1000);
		}
		
		
		log.info("send end");
		producer.close();
	}
}